package org.zjvis.datascience.service;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.zjvis.datascience.common.dto.PipelineDTO;
import org.zjvis.datascience.common.dto.PipelineInstanceDTO;
import org.zjvis.datascience.common.dto.TaskDTO;
import org.zjvis.datascience.common.dto.TaskInstanceDTO;
import org.zjvis.datascience.common.enums.ActionEnum;
import org.zjvis.datascience.common.enums.TaskInstanceStatus;
import org.zjvis.datascience.common.exception.BaseErrorCode;
import org.zjvis.datascience.common.exception.DataScienceException;
import org.zjvis.datascience.common.util.JwtUtil;
import org.zjvis.datascience.common.vo.TaskInstanceVO;
import org.zjvis.datascience.service.dag.DAGScheduler;
import org.zjvis.datascience.service.mapper.PipelineMapper;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * @description Pipeline数据视图 Service
 * @date 2021-10-29
 */
@Service
public class PipelineService {

    @Autowired
    private PipelineMapper pipelineMapper;

    @Lazy
    @Autowired
    private TaskService taskService;

    @Autowired
    @Lazy
    private PipelineInstanceService pipelineInstanceService;

    @Autowired
    private TaskInstanceService taskInstanceService;

    @Autowired
    private ActionService actionService;

    @Autowired
    private DAGScheduler dagScheduler;

    public PipelineDTO queryById(Long id) {
        return pipelineMapper.queryById(id);
    }

    public List<PipelineDTO> queryByProject(Long projectId) {
        return pipelineMapper.queryByProject(projectId);
    }

    public Long save(PipelineDTO pipeline) {
        pipelineMapper.save(pipeline);
        return pipeline.getId();
    }

    public void update(PipelineDTO pipeline) {
        pipelineMapper.update(pipeline);
    }

    public void delete(Long id) {
        Map<String, Object> params = Maps.newHashMap();
        params.put("ids", new Long[]{id});
        pipelineMapper.delete(params);
    }

    public void delete(Long userId, List<Long> ids) {
        if (CollectionUtils.isEmpty(ids)) {
            return;
        }
        Map<String, Object> params = Maps.newHashMap();
        params.put("ids", ids);
        params.put("userId", userId);
        pipelineMapper.delete(params);
    }


    private boolean copyTasks(List<TaskDTO> tasks, Long pipelineId) {
        // newID, oldParentId
        Map<Long, String> oldRelationship = Maps.newHashMap();
        // oldId, newId
        Map<Long, Long> taskPairs = Maps.newHashMap();
        Long userId = JwtUtil.getCurrentUserId();
        for (TaskDTO task : tasks) {
            Long oldId = task.getId();
            task.setId(null);
            task.setUserId(userId);
            task.setGmtModify(null);
            task.setGmtCreate(null);
            task.setPipelineId(pipelineId);
            Long newId = taskService.simpleSave(task);
            String parentId = task.getParentId();
            if (StringUtils.isNotEmpty(parentId)) {
                oldRelationship.put(newId, parentId);
            }
            taskPairs.put(oldId, newId);
        }
        // update relationship
        for (Map.Entry<Long, String> entry : oldRelationship.entrySet()) {
            Long newId = entry.getKey();
            String[] parentIds = entry.getValue().split(",");
            TaskDTO taskDTO = new TaskDTO();
            taskDTO.setId(newId);
            List<String> newParentId = new ArrayList<>();
            for (String id : parentIds) {
                newParentId.add(taskPairs.get(Long.parseLong(id)).toString());
            }
            taskDTO.setParentId(Joiner.on(",").join(newParentId));
            taskService.update(taskDTO);
        }
        return true;
    }

    @Transactional(rollbackFor = SQLException.class)
    public Long copy(Long oldPipelineId) {
        List<TaskDTO> tasks = taskService.queryByPipeline(oldPipelineId);
        PipelineDTO newDto = queryById(oldPipelineId);
        newDto.setId(null);
        newDto.setUserId(JwtUtil.getCurrentUserId());
        newDto.setName(String.format("%s_copy", newDto.getName()));
        newDto.setGmtCreate(null);
        newDto.setGmtModify(null);
        Long pipelineId = this.save(newDto);
        newDto.setName(pipelineId + newDto.getName());
        this.update(newDto);
        copyTasks(tasks, pipelineId);
        return pipelineId;
    }

    @Transactional(rollbackFor = Exception.class)
    public void deleteByProject(long projectId) {
        deleteTasksByProject(projectId);
        pipelineMapper.deleteByProject(projectId);
    }

    /**
     * 多个session日志查询
     *
     * @param sessions
     * @return
     */
    public JSONObject queryStatus(String sessions) {
        JSONObject result = new JSONObject();
        String[] tmpSessions = sessions.split(",");
        JSONArray jsonArray = new JSONArray();
        for (String session : tmpSessions) {
            PipelineInstanceDTO pi = pipelineInstanceService.queryById(Long.parseLong(session));
            List<TaskInstanceDTO> instances = taskInstanceService.queryBySessionId(Long.parseLong(session));
            Map<Long, TaskInstanceVO> taskMap = Maps.newHashMap();
            List<String> totalLogs = new ArrayList<>();
            boolean isFinished = true;
            JSONObject jsonObject = new JSONObject();
            for (TaskInstanceDTO instance : instances) {
                taskMap.put(instance.getTaskId(), instance.view());
                String sb =
                    String.format("[%s_%s] ", instance.getTaskName(), instance.getTaskId()) +
                        String.format("[%s] ", instance.getStatus()) +
                        String.format("[%s]",
                            instance.getLogInfo() == null ? "" : instance.getLogInfo());
                totalLogs.add(sb);
                String status = instance.getStatus();
                if (status.equals(TaskInstanceStatus.RUNNING.toString()) || status
                    .equals(TaskInstanceStatus.CREATE.toString())) {
                    isFinished = false;
                }
            }
            jsonObject.put("sessionId", Long.parseLong(session));
            // 只有任务没有结束，都是RUNNING状态
            jsonObject
                .put("status", isFinished ? pi.getStatus() : TaskInstanceStatus.RUNNING.toString());
            jsonObject.put("taskMap", taskMap);
            jsonObject.put("totalLogs", totalLogs);
            jsonArray.add(jsonObject);
        }
        result.put("sessionLogs", jsonArray);
        System.out.println(result.toJSONString());
        return result;
    }

    /**
     * 删除项目的所有任务入口
     *
     * @param projectId
     */
    private void deleteTasksByProject(long projectId) {
        List<PipelineDTO> pipelines = queryByProject(projectId);
        if (CollectionUtils.isEmpty(pipelines)) {
            return;
        }
        List<Long> pipelineIds = pipelines.stream().mapToLong(PipelineDTO::getId).boxed()
            .collect(Collectors.toList());
        taskService.deleteWithRelevantData(pipelineIds, projectId);
    }

    public PipelineDTO checkExist(long id) {
        PipelineDTO dto = pipelineMapper.queryById(id);
        if (dto == null) {
            throw DataScienceException.of(BaseErrorCode.PIPELINE_NOT_EXIST, "id:" + id);
        }
        return dto;
    }

    /**
     * 删除pipeline的所有任务入口
     *
     * @param pipelineId
     */
    public void deleteTasksByPipeline(Long pipelineId) {
        PipelineDTO pipelineDTO = queryById(pipelineId);
        taskService.deletePipelineRelevantData(pipelineId, pipelineDTO.getProjectId());
    }

    public PipelineDTO queryByProjectAndUser(Long projectId, Long userId) {
        return pipelineMapper.queryByProjectAndUser(projectId, userId).get(0);
    }

    /**
     * 清除pipeline中所有task
     *
     * @param pipelineId
     */
    public void clean(Long pipelineId) {
        List<Long> taskIds = taskService.queryIdByPepelineId(pipelineId);
        if (taskIds.size() == 0) {
            return;
        }
        List<TaskDTO> deletedTasks = new ArrayList<>();
        JSONObject context = new JSONObject();
        taskService.batchDelete(context, taskIds, deletedTasks, pipelineId);

        actionService.addActionForTask(ActionEnum.DELETE, deletedTasks, null, null, context);
    }

    public void checkAuth(long pipelineId) {
        long userId = JwtUtil.getCurrentUserId();
        if (userId == 0) {
            throw new DataScienceException(BaseErrorCode.UNAUTHORIZED);
        }

        PipelineDTO pipeline = pipelineMapper.queryById(pipelineId);

        if (pipeline == null || JwtUtil.getCurrentUserId() != pipeline.getUserId()) {
            throw new DataScienceException(BaseErrorCode.PIPELINE_NO_OPERATION_PERMISSION);
        }
    }

    public Long traceMostHeadTask(String taskIdStr) {
        //taskId -> 0L 表示全部执行
        //taskId -> 指定Id 表示执行单个节点
        //多个taskId 表示部分执行，找到最顶级的节点，然后从左到右调度
        if (taskIdStr.contains(",")) {
            List<Long> ids = Arrays.stream(taskIdStr.split(",")).map(Long::parseLong).collect(Collectors.toList());
            // find LCA in DAG
            return dagScheduler.findLCA(ids);
        } else {
            return Long.parseLong(taskIdStr);
        }
    }

    public boolean pipelineExists(Long projectId, Long userId) {
        List<PipelineDTO> res = pipelineMapper.queryByProjectAndUser(projectId, userId);
        return res == null || res.isEmpty()? false : true;
    }
}
